{
  "metadata" : {
    "name" : "Spark Example",
    "user_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Pointers to directories and files."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import sys.process._\nval remoteFile = \"http://med-at-scale.s3.amazonaws.com/spark-training/dj.csv\"\nvar dataDir = \"/tmp\"\nval localFile = s\"${dataDir}/dow.csv\"",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "We download this file"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "s\"wget $remoteFile -O $localFile\" !!",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "sparkContext.getConf.toDebugString",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val lines = sparkContext.textFile(localFile)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "lines.take(4)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "The case classes we need to define the schema and parsers"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "object model extends Serializable {\n\n  object MyDate {\n    val df = new java.text.SimpleDateFormat(\"yyyy-MM-dd\")\n    def parse(field: String): Option[MyDate] = {\n      try {\n        val ts = df.parse(field).getTime\n        field.split(\"-\").map(_.toInt).toList match {\n          case year :: month :: day :: _ => Some(MyDate(year, month, day, ts))\n          case _ => None\n        }\n      } catch {\n        case ex: Throwable => \n          Console.err.println(s\"$ex: datefield = $field\")\n        None\n      }\n    }\n  }\n  case class MyDate(year: Int, month: Int, day: Int, timestamp: Long)\n  \n\n  object Quote {\n    def parse(line: String): Option[Quote] = {\n      val fields = line.trim.split(\"\"\"\\s*,\\s*\"\"\")\n      try {\n        MyDate.parse(fields(1)).map { date => Quote(fields(0), date, fields(2).toDouble)}                \n      } catch {\n        case ex: NumberFormatException =>\n          Console.err.println(s\"$ex: line = $line\")\n          None\n        case ex: IndexOutOfBoundsException =>\n          Console.err.println(s\"$ex: line = $line\")\n          None\n      }\n    }\n  }\n  case class Quote(stock:String, date:MyDate, price:Double)\n\n}\nimport model._",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Parsing the file and convert to Quote objects"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val quotes = lines.map(Quote.parse).collect{case Some(q) => q}",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Import SQLContext (wrapper around SparkContext to give access to sparkSQL functions)\nCreate the SQLContext\nLoad some implicit functions "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.sql.SQLContext\nval sqlContext = new SQLContext(sparkContext)\nimport sqlContext.implicits._ ",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Now we crrate a dataframe from the RDD, schema is built from the case class definition, including nested structure"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val quotesdf = sqlContext.createDataFrame(quotes)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Print the Dataframe schema"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "quotesdf.printSchema",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "We change the column names by creating a new dataframe"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val ts = quotesdf.select(\"date.timestamp\").map(_.getAs[Long](0)).distinct.collect.toList.sorted\nval withNextTs = ts.sliding(2, 1).map(x => (x(0) → x(1))).toList",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "quotesdf.count",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val scoped = new Serializable {\n  \n  @transient val wnt = withNextTs\n  \n  val bc = sparkContext.broadcast(wnt)\n  \n  val updatedQuotes:Dataset[(String, Long, Double)] = \n      quotesdf\n      .withColumn(\"ts\", $\"date.timestamp\")\n      .flatMap { row: Row =>\n        val ts = row.getAs[Long](3)\n        val wnt = bc.value\n        wnt.find(_._2 == ts) match {\n          case None => List.empty[(String, Long, Double)]\n          case Some((previousTs, _)) => \n            val thisUpdated = (row.getAs[String](0), ts, -1 * row.getAs[Double](2)) \n            val newRow = (row.getAs[String](0), previousTs, row.getAs[Double](2)) \n            \n            if (!wnt.find(_._1 == ts).isDefined)\n              List(newRow)\n            else if (!wnt.find(_._2 == previousTs).isDefined)\n              List(thisUpdated)\n            else\n              List(newRow, thisUpdated)\n        }\n      }\n  val df = updatedQuotes.toDF(\"symbol\", \"ts\", \"close\")\n}\nval duplicatedWithPreviousTs = scoped.df\n\nimport org.apache.spark.sql.functions._\nval diffQuotes = duplicatedWithPreviousTs.groupBy(\"symbol\", \"ts\").agg(sum(\"close\").as(\"diff_close\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":markdown \nWe have ${diffQuotes.count} element",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Write data to parquet and json"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "diffQuotes.write.parquet(s\"$dataDir/dow.parquet\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "diffQuotes.write.json(s\"$dataDir/quotes.json\")",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Filter rows"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val ibm = diffQuotes.filter($\"symbol\" === \"IBM\" && $\"diff_close\" < -10).orderBy($\"diff_close\".asc)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val ko = diffQuotes.filter($\"symbol\" === \"KO\" && $\"diff_close\" < -10)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "quotesdf.filter($\"stock\" === \"KO\").agg(max(\"price\"), min(\"price\"))",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Dataframes can be cached too"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "ibm.cache()",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Create a grouping"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val bySymbol = diffQuotes.groupBy(\"symbol\")",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Apply some aggregation on the grouping"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "bySymbol.count.orderBy($\"count\".desc)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Apply several aggregations on the grouping"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.sql.functions._\nbySymbol.agg(\n  count(\"diff_close\").as(\"count\"), \n  min(\"diff_close\").as(\"min\"), \n  max(\"diff_close\").as(\"max\"), \n  mean(\"diff_close\").as(\"avg.\")\n)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Now work with SQL..."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val data = sqlContext.read.parquet(s\"$dataDir/dow.parquet\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "data.registerTempTable(\"quote\")\ndata.cache()\n()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "sqlContext.sql(\"\"\"\n  SELECT s.symbol, s.ts, s.diff_close \n  FROM quote s \n  WHERE symbol = 'IBM' ORDER BY s.ts ASC\n\"\"\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "sqlContext.sql(\"\"\"\n SELECT q.symbol AS symbol, count(*) as count \n FROM quote q GROUP BY q.symbol \n ORDER BY count DESC\n\"\"\")",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}
